// 鏁版嵁鎺ユ敹绾跨▼
package com.thread;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.util.concurrent.BlockingQueue;

import com.tool.MessageBuffer;
import com.databus.*;
import com.packet.PackageData;
import com.packet.PackageHeader;

public class MsgReceiveThread implements Runnable {
	
	public MsgReceiveThread(MessageCallBack cb, DataInputStream input, int bufSize,
			BlockingQueue<PackageData> syncDataQueue, BlockingQueue<PackageData> asynDataQueue)
	{
		mCallBack = cb;
		mInput = input;
		mBufferSize = bufSize;
		mPackDataSyncQueue = syncDataQueue;
		mPackDataAsynQueue = asynDataQueue;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		Log.info("Start Message Receive Thread...");
		
		ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
		MessageBuffer buffer = new MessageBuffer(mInput, mBufferSize);
		while(!Thread.interrupted())
		{
			bos.reset();
			try {
				try {
					if (!buffer.readEqual(PackageHeader.flag1)) {
						Log.error("read date error");
						if (null != mCallBack) {
							mCallBack.OnConnectChange(MessageCallBack.EConnectStatus.ConnectStatus_Offline);
						}
						break;
					}
				}
				catch(Exception ex){
					Log.error(ex);
					if (null != mCallBack) {
						mCallBack.OnConnectChange(MessageCallBack.EConnectStatus.ConnectStatus_Offline);
					}
					break;
				}
				byte next = buffer.read();
				if (next != PackageHeader.flag2)
					continue;	
				
				buffer.readFillLength(bos, PackageHeader.SIZE_OF_HEAD - 2);
				byte[] headers = bos.toByteArray();
				bos.reset();
				
				PackageHeader pHeader = new PackageHeader();
				pHeader.unserialize(headers);
				
				if(pHeader.offset < PackageHeader.SIZE_OF_HEAD)
				{
					Log.error("off is less : " + pHeader.offset);
					continue;
				}
				if(pHeader.offset > PackageHeader.SIZE_OF_HEAD)
				{
					Log.debug("off is change, off = " + pHeader.offset);
					if(pHeader.offset > 2 * PackageHeader.SIZE_OF_HEAD)
					{
						Log.warn("PackageHeader off is large, off = " + pHeader.offset);
						continue;
					}
					byte[] temp = new byte[pHeader.offset - PackageHeader.SIZE_OF_HEAD];
					buffer.read(temp);
				}
				
				if(pHeader.bodysize < 0) {
					Log.error("bodysize is nagetive");
					continue;
				}
				
				if(!buffer.readFillLength(bos, pHeader.bodysize))
				{
					Log.error("get message body failed!");
					continue;
				}
				
				PackageData packData = new PackageData();
				packData.mHeader = pHeader;
				packData.mData = bos.toByteArray();
				
				if(pHeader.IsSequence())
				{
					mPackDataSyncQueue.add(packData);
				}
				else
				{
					mPackDataAsynQueue.add(packData);
				}

			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				
				if(null != mCallBack)
				{
					mCallBack.OnConnectChange(MessageCallBack.EConnectStatus.ConnectStatus_Offline);
				}
				
				break;
			}
		}
		
		Log.info("End Message Receive Thread...");
	}
	

	private MessageCallBack mCallBack = null;             // 鍥炶皟鎺ュ彛
	private DataInputStream mInput;                        // 鎺ユ敹缂撳啿鍖�
	private int mBufferSize;                               // 娑堟伅缂撳啿鍖哄ぇ灏�
	private BlockingQueue<PackageData> mPackDataSyncQueue; // 娑堟伅鎺ユ敹鏃跺簭闃熷垪
	private BlockingQueue<PackageData> mPackDataAsynQueue; // 娑堟伅鎺ユ敹闈炴椂搴忛槦鍒�
}
